Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Composite Decoder #179

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

artem1205
Copy link
Contributor

@artem1205 artem1205 commented Dec 17, 2024

What

Resolving Source Amazon Seller Partner CDK migration

add composite decoder to apply decompressors | decoders consequently

Note

memory test usage moved to unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py. JsonlDecoder (as well as others streamable decoders) should be deleted/replaced with CompositeRawDecoder+ parser

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced new parsers: GzipParser, JsonLineParser, and CsvParser.
    • Added CompositeRawDecoder for enhanced data decoding capabilities.
  • Bug Fixes

    • Updated SimpleRetriever to include the new CompositeRawDecoder.
  • Tests

    • Added unit tests for the CompositeRawDecoder and its parsers.
    • Introduced memory usage tests for JSON line decoders.
  • Chores

    • Removed obsolete memory usage test for JsonlDecoder.

@artem1205 artem1205 self-assigned this Dec 17, 2024
@github-actions github-actions bot added the enhancement New feature or request label Dec 17, 2024
Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 requested a review from maxi297 December 18, 2024 15:10
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a couple of questions to improve my understanding


@dataclass
class Parser(ABC):
inner_parser: Optional["Parser"] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird to me that the interface exposes an inner_parser as a public field. Should this be removed from the interface and the parser implementations will decide whatever they use internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed from the interface and the parser implementations will decide whatever they use internally?

this was added to interface to use declarative approach and wrap parser-in-parser-in-parser as many times as it is needed, e.g.:

- decoder:
   type: CompositeRawDecoder
   parser: 
     type: GzipParser
     inner_parser:
       type: parser1
         inner_parser:
           type: parser2
...

In my understanding parent parsers does not know about inner_parser implementation and all they do is just pass transformed object down through the pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that it'll be part of the implementations but not the interface. So for example in declarative_component_schema.yaml, we will have:

  DefaultDecoder:
    title: Decoder with a parser
    type: object
    required:
      - type
      - parser
    properties:
      type:
        type: string
        enum: [DefaultDecoder]
      parser:
        anyOf:
          - "$ref": "#/definitions/CsvParser"
          - "$ref": "#/definitions/JsonlParser"
          <...>

  CsvParser:
    title: CSV Parser
    type: object
    required:
      - type
    properties:
      type:
        type: string
        enum: [CsvParser]
      <probably some CSV parsing options that we have for file based>
  GzipParser:
    title: GZIP Parser
    type: object
    required:
      - type
      - underlying_parser
    properties:
      type:
        type: string
        enum: [GzipParser]  
      underlying_parser:
        anyOf:
          - "$ref": "#/definitions/CsvParser"
          - "$ref": "#/definitions/JsonlParser"
          <...>

Each of these parser components will have their instantiation method in the model_to_component_factory and if they require an inner parser (like the GzipParser), we will instante that parser at that point. So the fact that a parser uses a parser underneath does not need to be exposed as part of the Python interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, refactored


@abstractmethod
def parse(
self, data: BufferedDataInput | Iterable, *args, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this interface. Does that mean that all the users with have to check the data parameter as such:

    def parse(
        self, data: BufferedDataInput | Iterable, *args, **kwargs
    ) -> Generator[MutableMapping[str, Any], None, None]:
        if isinstance(data, BufferedDataInput):
            < do X >
        elif isinstance(data, Iterable):
            < do Y >
        else:
            raise ValueError(f"Unexpected type {type(data)}")

I also don't see the usage of BufferedDataInput in the sample parsers you provided below, maybe I missed it. Can you show me how this would be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that all the users with have to check the data parameter as such:

Usually not, cause I expect user to choose valid parser sequence.

I also don't see the usage of BufferedDataInput in the sample parsers you provided below, maybe I missed it. Can you show me how this would be used?

We start processing from passing response.raw in

self.parser.parse(data=response.raw)

which in fact is readable (buffered) object.

any decompressor, like gzip or zip returns decompressor object , which in turn is also readable (buffered).

Copy link
Contributor

@maxi297 maxi297 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! I really like the idea of having a buffered reader as a part of the interface. Maybe it should even be a by more meaning that the file-based CsvParser is relying on more methods than just "read". Therefore, should it be IOBase instead of BufferedDataInput?

So I think my question now is: can we remove Iterable from the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess yes, refactored interface to BufferedIOBase

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool! I think this is a good improvement

class Parser(ABC):
@abstractmethod
def parse(
self, data: BufferedIOBase, *args, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand: Why do we have *args, **kwargs in the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason (or real use-case) for now, can be deleted.

@artem1205 artem1205 marked this pull request as ready for review December 23, 2024 20:09
Copy link
Contributor

coderabbitai bot commented Dec 23, 2024

📝 Walkthrough

Walkthrough

This pull request introduces a new CompositeRawDecoder and associated parsers to enhance data decoding capabilities in the Airbyte CDK. The changes focus on adding flexible parsing strategies for different data formats, including Gzip-compressed files, JSON Lines, and CSV data. The implementation allows for more robust handling of various response types, with support for different encodings and parsing configurations.

Changes

File Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added new parser types: CompositeRawDecoder, GzipParser, JsonLineParser, and CsvParser
airbyte_cdk/sources/declarative/decoders/__init__.py Updated to include CompositeRawDecoder in module exports
airbyte_cdk/sources/declarative/decoders/composite_decoder.py Implemented new decoder classes with parsing strategies for different data formats
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Added Pydantic models for new parser types
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Added factory methods for creating new parser instances
unit_tests/sources/declarative/decoders/test_composite_decoder.py Introduced unit tests for CompositeRawDecoder and its parsers
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py Added tests for memory usage of JSON line decoders
unit_tests/sources/declarative/decoders/test_json_decoder.py Removed memory usage test for JsonlDecoder

Sequence Diagram

sequenceDiagram
    participant Response as HTTP Response
    participant Decoder as CompositeRawDecoder
    participant Parser as Selected Parser
    participant Result as Parsed Records

    Response->>Decoder: Receive raw response
    Decoder->>Parser: Select appropriate parser
    Parser->>Parser: Parse data (Gzip/JSON/CSV)
    Parser-->>Result: Generate record dictionaries
Loading

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297
  • aldogonzalez8

Hey there! 👋 I noticed you've added some really cool parsing capabilities. Would you be interested in discussing how these new parsers might handle edge cases? Wdyt about adding some additional error handling or logging for malformed data? 🤔

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. We would love to hear your feedback on Discord.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (20)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)

10-10: Nice import of CompositeRawDecoder.
This addition expands the module's offerings. Perhaps we might include a docstring in the composite_decoder.py file to explain its use cases for future readers, wdyt?

airbyte_cdk/sources/declarative/decoders/composite_decoder.py (4)

18-19: Interface design looks clear.
Introducing Parser as an ABC is a neat approach to unify custom parsers. Perhaps we could add a short docstring on the intended usage or common parameters, so new contributors can adopt the pattern seamlessly, wdyt?


55-58: Potential optimization for error handling.
When JSON decoding fails, we log a warning and skip. This is good for resilience, but might it be helpful to have a configurable option to raise an exception if unexpected lines are encountered, wdyt?


76-83: Potential multi-chunk reading concern.
Reading large CSVs in chunks is effective. Could we consider memory usage limits or advanced chunk sizing if extremely large files are processed, wdyt?


95-104: CompositeRawDecoder usage clarity.
Implementation looks straightforward, but could we include an inline doc or example clarifying what happens if the underlying parser fails on a line (e.g., how are partial results handled), wdyt?

unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (2)

25-31: Pragmatic approach for test data generation.
Generating 2 million lines is a hefty load. Should we consider adding an environment-based toggle or a smaller default for local testing, wdyt?


35-36: Tip on skipping slow tests.
We decorate this with @pytest.mark.slow. Maybe we can also provide a quick way to skip slow tests locally unless explicitly enabled, wdyt?

unit_tests/sources/declarative/decoders/test_composite_decoder.py (2)

55-67: Gzip CSV decoding test coverage is good!
Testing with multiple encodings ensures robust coverage. Could we also add a test scenario for missing headers or partial CSV lines, wdyt?


93-106: Gzip JSON lines test scenario is comprehensive.
We check repeated iteration logic and assert record counts. Might it be useful to test a scenario containing at least one invalid line within a gzipped bundle, wdyt?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)

1112-1114: Consider clarifying the class docstring.
Would you consider adding a short docstring to explain the JsonLineParser's usage? This can help future maintainers easily identify its purpose. wdyt?


1116-1121: Check consistency with other parsers.
You might consider adding docstrings or class-level comments like with JsonLineParser for uniformity. wdyt?


1536-1540: Allowing flexible parser composition.
Might it be helpful to add a small example or usage note on how the CompositeRawDecoder is configured, especially with nested formats? This could reduce confusion for new developers. wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

146-148: CsvParserModel import.
Nice addition. Could we also provide a quick usage example in the docstring for create_csv_parser to guide developers? wdyt?


242-244: JsonLineParserModel import.
Everything looks aligned. Possibly add a note or reference in docstrings to ensure clarity for new maintainers. wdyt?


461-461: Adding CompositeRawDecoder constructor mapping.
This is a straightforward approach. Perhaps confirm we log or handle the scenario if a user tries to combine multiple decoders? wdyt?


1690-1695: Clean approach for JsonLineParser creation.
Consider adding input validation for cases where encoding is None to avoid runtime errors if a user sets it unexpectedly. wdyt?


1720-1725: CompositeRawDecoder creation method.
Perhaps log the selected parser type at runtime to aid debugging. wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)

2726-2728: Experimental annotation in CompositeRawDecoder.
Would you consider a short note specifying potential changes to the interface, to caution users about relying on it immediately? wdyt?


2756-2761: JsonLineParser YAML entry.
Everything here is straightforward. Maybe add a quick usage snippet explaining how the encoding field is applied. wdyt?


2762-2774: CsvParser defaults.
Would you consider adding an example for unusual delimiters like "\t" or ";" to the documentation to make it more explicit? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 216cd43 and 5276ed1.

📒 Files selected for processing (8)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/decoders/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
  • unit_tests/sources/declarative/decoders/test_composite_decoder.py (1 hunks)
  • unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1 hunks)
  • unit_tests/sources/declarative/decoders/test_json_decoder.py (0 hunks)
💤 Files with no reviewable changes (1)
  • unit_tests/sources/declarative/decoders/test_json_decoder.py
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)

12-12: Exporting CompositeRawDecoder via all is a good step.
This ensures that the decoder is publicly accessible when importing from the package. No issues found here!

airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)

33-33: Question about layering multiple GzipParsers.
Even though GzipParser can wrap another parser, might we want to add a quick guard that ensures we’re not repeatedly decompressing if the file is already decompressed, wdyt?

unit_tests/sources/declarative/decoders/test_composite_decoder.py (1)

108-119: Nice variety of encodings for JSON lines.
Everything looks smooth here. No further suggestions.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1501-1505: Fallback strategy for inner_parser.
Could it be valuable to define a fallback parser if none is specified? This might guard against unexpected null references if inner_parser is optional in future. wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

70-75: Imports appear correct.
Everything looks consistent with the newly introduced classes from composite_decoder. Great job!


1716-1719: CSV parser constructor.
This is good. Could setting a default delimiter to "," be validated against potential multi-character delimiters or non-standard uses? wdyt?


134-136: Ensure alignment with newly added model.
Would you consider verifying that CompositeRawDecoderModel is properly referenced in all relevant factory methods? wdyt?

✅ Verification successful

Let me gather information about the usage of CompositeRawDecoderModel in the codebase.

This will help us understand:

  1. Where CompositeRawDecoderModel is used in factory methods
  2. All references to CompositeRawDecoderModel in the codebase
  3. All factory-related files that might need to reference this model

Let me check the test file to ensure proper test coverage for the CompositeRawDecoderModel.


Let me try one more search to look for any composite decoder related tests with a broader pattern.


CompositeRawDecoderModel is properly integrated in the factory

Based on the codebase analysis, the CompositeRawDecoderModel is correctly referenced in all necessary places within the model_to_component_factory.py:

  • It's properly imported from the schema
  • Registered in the factory mapping to its creation method
  • Has a dedicated factory method create_composite_raw_decoder with correct type hints

However, I notice there might be missing test coverage. Would you consider adding tests for the composite raw decoder factory method to ensure its reliability? wdyt?

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for CompositeRawDecoderModel usage in factory methods
ast-grep --pattern 'def $_(self, $_, CompositeRawDecoderModel, $_) { $$$ }'

# Search for any references to CompositeRawDecoderModel
rg "CompositeRawDecoderModel" -A 2

# Look for factory-related files that might need to reference this model
fd "factory" --type f

Length of output: 1464


Script:

#!/bin/bash
# Check test coverage for CompositeRawDecoderModel
rg "CompositeRawDecoderModel" unit_tests/sources/declarative/parsers/test_model_to_component_factory.py -A 3

# Look for test cases related to composite raw decoder
ast-grep --pattern 'def test_$_composite_raw_decoder$_($$$) { $$$ }' unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Length of output: 253


Script:

#!/bin/bash
# Search for any composite decoder related tests with a broader pattern
rg "composite.*decoder" -i unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

# Also check the actual implementation details of the factory method
rg "create_composite_raw_decoder" -A 5 airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Length of output: 920


215-217: Confirm references to GzipParserModel.
Just to be sure: do we need specialized error handling or fallback for mismatched references to GzipParserModel? wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2722-2722: Reference to CompositeRawDecoder.
Looks good for including CompositeRawDecoder in the SimpleRetriever’s decoder list. Have you verified any potential conflicts with existing decoders? wdyt?

✅ Verification successful

Let me verify the potential conflicts with existing decoders by searching through the codebase.


Let me search for more specific information about the CompositeRawDecoder and its parsers.


CompositeRawDecoder looks compatible with existing decoders

Based on the schema definitions, the CompositeRawDecoder is designed to work with a specific set of parsers (GzipParser, JsonLineParser, CsvParser) in a composable way. There are no conflicts visible as:

  • Each parser has well-defined, non-overlapping responsibilities
  • GzipParser is explicitly designed to work with inner parsers (JsonLineParser or CsvParser)
  • The schema enforces these relationships through proper type definitions
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for decoder definitions in the schema file
rg -A 5 "CompositeRawDecoder|RawDecoder" airbyte_cdk/sources/declarative/declarative_component_schema.yaml

# Search for any decoder implementations or references
ast-grep --pattern 'class $_ implements RawDecoder'
ast-grep --pattern 'class $_ extends RawDecoder'
ast-grep --pattern 'class CompositeRawDecoder'

# Look for any usage patterns of decoders
rg -A 3 "decoder.*:" airbyte_cdk/sources/declarative/

Length of output: 17586


Script:

#!/bin/bash
# Search for parser definitions that might be used with CompositeRawDecoder
rg -A 10 "GzipParser|JsonLineParser|CsvParser" airbyte_cdk/sources/declarative/declarative_component_schema.yaml

# Look for any implementations or usages of these parsers
ast-grep --pattern 'class $_(GzipParser|JsonLineParser|CsvParser)'
rg "type.*: .*Parser" airbyte_cdk/sources/declarative/

Length of output: 1372

Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1690-1695: Consider handling None encoding?

Would you like to ensure there's a safe default or check when model.encoding is None, so we avoid unexpected decoding errors? wdyt?


1718-1721: Handle defaults for CSV parser options?

Would you like to set a fallback if model.delimiter or model.encoding is None, or raise a clearer error? wdyt?


1722-1727: Validate parser configuration?

Would you be open to adding a quick check if model.parser is None or invalid? This may prevent unexpected runtime errors. wdyt?

airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)

18-29: Naming consideration for Parser?

Would you consider something more descriptive like "DataParser"? Or are you comfortable with the generic name? wdyt?


47-61: Graceful handling of malformed JSON lines.

You currently skip lines that aren’t valid JSON. If you anticipate a high failure rate, would it be helpful to optionally stop or store such lines? wdyt?


63-83: About the TODO note.

There’s a TODO: "migrate implementation to re-use file-base classes." Would you like help creating a GitHub issue or add the changes in this PR? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5276ed1 and adf9d3d.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)

70-75: Looks good!

The imports from "composite_decoder" appear consistent with the rest of the codebase, nicely grouped, and concise.


134-136: No issues spotted.

Adding "CompositeRawDecoderModel" to the imports for usage in the mappings is straightforward.


146-148: All set here.

The "CsvParserModel" import nicely aligns with the rest of the decoders.


215-217: Imports look appropriate.

"GzipParserModel" is properly added for new parser creation.


242-244: No concerns here.

"JsonLineParserModel" import is consistent with the design.


461-461: Clean addition.

The "CompositeRawDecoderModel" mapped to "self.create_composite_raw_decoder" in "PYDANTIC_MODEL_TO_CONSTRUCTOR" ensures correct wiring.


492-492: Well-integrated.

"JsonLineParserModel" is mapped via the new create method.


494-494: All good.

"GzipParserModel" is properly linked to the factory method.


1712-1717: Raise an error for missing inner_parser?

Would it help to log or raise an error if the inner_parser is missing or invalid, so the user knows immediately? wdyt?

airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)

1-12: Imports appear concise and purposeful.

The dependencies (gzip, json, logging, pandas, requests, etc.) match the intended functionality here.


31-44: Potential memory usage check for GzipParser?

Would you like to confirm that the GzipFile approach handles large files gracefully, or possibly add chunk-based reading if needed? wdyt?


85-104: CompositeRawDecoder implementation appears solid.

The streaming approach to parse data from response.raw looks good. No major issues spotted.

Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (5)

18-29: How about enhancing the docstring for better clarity? wdyt?

The docstring could provide more details about:

  • Expected format/structure of the yielded dictionaries
  • Any specific error handling expectations
  • Whether the parser should close the input stream

Example:

     def parse(
         self,
         data: BufferedIOBase,
     ) -> Generator[MutableMapping[str, Any], None, None]:
         """
-        Parse data and yield dictionaries.
+        Parse data from a buffered stream and yield dictionaries.
+
+        Args:
+            data: A buffered stream containing the data to parse
+
+        Returns:
+            A generator yielding parsed records as dictionaries
+
+        Raises:
+            ValueError: If the data cannot be parsed according to the expected format
         """
         pass

35-43: Should we add error handling for corrupted gzip data? wdyt?

The current implementation might raise uncaught exceptions for corrupted gzip data. Consider wrapping the gzip operations in a try-except block:

     def parse(
         self,
         data: BufferedIOBase,
     ) -> Generator[MutableMapping[str, Any], None, None]:
-        gzipobj = gzip.GzipFile(fileobj=data, mode="rb")
-        yield from self.inner_parser.parse(gzipobj)
+        try:
+            with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
+                yield from self.inner_parser.parse(gzipobj)
+        except gzip.BadGzipFile as e:
+            raise ValueError(f"Failed to decompress gzip data: {e}")

50-59: How about enhancing the error logging with more context? wdyt?

The current warning message could be more helpful by including line numbers and truncated content:

     def parse(
         self,
         data: BufferedIOBase,
     ) -> Generator[MutableMapping[str, Any], None, None]:
-        for line in data:
+        for line_num, line in enumerate(data, 1):
             try:
                 yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
             except json.JSONDecodeError:
-                logger.warning(f"Cannot decode/parse line {line!r} as JSON")
+                decoded_line = line.decode(encoding=self.encoding or "utf-8")
+                preview = decoded_line[:100] + "..." if len(decoded_line) > 100 else decoded_line
+                logger.warning(
+                    f"Cannot parse line {line_num} as JSON. Preview: {preview!r}"
+                )

67-80: Consider using csv.DictReader instead of pandas for simpler CSV parsing? wdyt?

Using pandas for basic CSV parsing might be overkill, especially since we're only using basic features. The standard library's csv.DictReader could be more lightweight:

     def parse(
         self,
         data: BufferedIOBase,
     ) -> Generator[MutableMapping[str, Any], None, None]:
-        reader = pd.read_csv(  # type: ignore
-            data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding
-        )
-        for chunk in reader:
-            chunk = chunk.replace({nan: None}).to_dict(orient="records")
-            for row in chunk:
-                yield row
+        text_data = data.read().decode(encoding=self.encoding or "utf-8")
+        csv_reader = csv.DictReader(
+            text_data.splitlines(),
+            delimiter=self.delimiter
+        )
+        yield from csv_reader

This would:

  • Remove the pandas dependency
  • Simplify the code
  • Handle the None/NaN conversion automatically

83-102: Should we improve type safety and documentation for response.raw handling? wdyt?

The # type: ignore[arg-type] suggests a type mismatch between response.raw and BufferedIOBase. Consider:

  1. Adding type hints to clarify the expected type:
     def decode(
         self, response: requests.Response
     ) -> Generator[MutableMapping[str, Any], None, None]:
+        # response.raw is a urllib3.response.HTTPResponse which is compatible with BufferedIOBase
         yield from self.parser.parse(data=response.raw)  # type: ignore[arg-type]
  1. Enhancing the class docstring with details about response.raw handling:
     """
     Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
     passed response.raw to parser(s).
     Note: response.raw is not decoded/decompressed by default.
     parsers should be instantiated recursively.
+
+    Important:
+    - The response.raw attribute is a raw HTTP response from urllib3
+    - It must be read in binary mode to properly handle compression
+    - The stream is automatically closed when fully consumed
+
     Example:
     composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
     """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between adf9d3d and 7604b99.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)

1-16: LGTM! Clean and well-organized imports.

The imports are logically grouped and all seem necessary for the implementation.

@artem1205 artem1205 requested a review from pnilan December 24, 2024 11:36
@artem1205 artem1205 requested a review from brianjlai December 24, 2024 11:36
…te-decoder-with-parsers

# Conflicts:
#	airbyte_cdk/sources/declarative/decoders/__init__.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)

2773-2788: Would you consider enhancing the documentation for CompositeRawDecoder?

The experimental component could benefit from:

  1. A more detailed description explaining its purpose and use cases
  2. Example configurations showing how to chain parsers
  3. Clear warnings about potential breaking changes since it's experimental

wdyt?


2809-2821: Would you consider adding more CSV configuration options?

The CsvParser could benefit from additional configuration options commonly needed when parsing CSV files:

  • quote_char: Character used for quoting fields
  • escape_char: Character used for escaping special characters
  • has_header: Boolean indicating if the first row is a header
  • skip_rows: Number of rows to skip from the beginning

These options would make the parser more flexible for various CSV formats. wdyt?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)

1128-1130: Consider adding encoding validation.

Would it help to validate the encoding parameter against Python's standard encodings to prevent runtime errors? For example, using codecs.lookup() to verify the encoding exists. wdyt?

from codecs import lookup

class JsonLineParser(BaseModel):
    encoding: Optional[str] = "utf-8"

    @validator("encoding")
    def validate_encoding(cls, v):
        try:
            if v is not None:
                lookup(v)
            return v
        except LookupError:
            raise ValueError(f"Unknown encoding: {v}")

1132-1135: Consider adding parameter validation.

Would it be helpful to add validation for both parameters? For example:

  1. Validate encoding like in JsonLineParser
  2. Ensure delimiter is exactly one character long
class CsvParser(BaseModel):
    type: Literal["CsvParser"]
    encoding: Optional[str] = "utf-8"
    delimiter: Optional[str] = ","

    @validator("encoding")
    def validate_encoding(cls, v):
        try:
            if v is not None:
                lookup(v)
            return v
        except LookupError:
            raise ValueError(f"Unknown encoding: {v}")

    @validator("delimiter")
    def validate_delimiter(cls, v):
        if v is not None and len(v) != 1:
            raise ValueError("Delimiter must be exactly one character")
        return v

1517-1520: Consider validating inner_parser.

Would it be helpful to ensure inner_parser is always provided since it's required for the composite pattern to work? We could add a validator to raise a descriptive error if it's missing. wdyt?

class GzipParser(BaseModel):
    type: Literal["GzipParser"]
    inner_parser: Union[JsonLineParser, CsvParser]

    @validator("inner_parser")
    def validate_inner_parser(cls, v):
        if v is None:
            raise ValueError("inner_parser is required for GzipParser")
        return v

1552-1555: Consider validating parser.

Similar to GzipParser, would it be helpful to validate that parser is always provided? We could add a validator to raise a descriptive error if it's missing. wdyt?

class CompositeRawDecoder(BaseModel):
    type: Literal["CompositeRawDecoder"]
    parser: Union[GzipParser, JsonLineParser, CsvParser]

    @validator("parser")
    def validate_parser(cls, v):
        if v is None:
            raise ValueError("parser is required for CompositeRawDecoder")
        return v
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1725-1729: Consider adding error handling.

Would it be helpful to add try-catch block to handle potential encoding errors early? This could provide better error messages during parser creation rather than at runtime. wdyt?

@staticmethod
def create_jsonline_parser(
    model: JsonLineParserModel, config: Config, **kwargs: Any
) -> JsonLineParser:
    try:
        if model.encoding:
            codecs.lookup(model.encoding)
        return JsonLineParser(encoding=model.encoding)
    except LookupError:
        raise ValueError(f"Invalid encoding '{model.encoding}' specified for JsonLineParser")

1747-1751: Consider adding error handling for inner parser creation.

Would it be helpful to add try-catch block to handle potential errors during inner parser creation? This could provide better error messages about which part of the composite failed. wdyt?

def create_gzip_parser(
    self, model: GzipParserModel, config: Config, **kwargs: Any
) -> GzipParser:
    try:
        inner_parser = self._create_component_from_model(model=model.inner_parser, config=config)
        return GzipParser(inner_parser=inner_parser)
    except Exception as e:
        raise ValueError(f"Failed to create inner parser for GzipParser: {str(e)}")

1754-1755: Consider adding error handling.

Would it be helpful to add validation for both encoding and delimiter during parser creation? This could catch configuration issues early. wdyt?

@staticmethod
def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser:
    try:
        if model.encoding:
            codecs.lookup(model.encoding)
        if model.delimiter and len(model.delimiter) != 1:
            raise ValueError("Delimiter must be exactly one character")
        return CsvParser(encoding=model.encoding, delimiter=model.delimiter)
    except LookupError:
        raise ValueError(f"Invalid encoding '{model.encoding}' specified for CsvParser")

1757-1761: Consider adding error handling for parser creation.

Would it be helpful to add try-catch block to handle potential errors during parser creation? This could provide better error messages about which parser failed to be created. wdyt?

def create_composite_raw_decoder(
    self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any
) -> CompositeRawDecoder:
    try:
        parser = self._create_component_from_model(model=model.parser, config=config)
        return CompositeRawDecoder(parser=parser)
    except Exception as e:
        raise ValueError(f"Failed to create parser for CompositeRawDecoder: {str(e)}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7604b99 and f9a97db.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/decoders/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/decoders/init.py
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)

2789-2802: The GzipParser implementation looks good, but could benefit from explicit error handling.

Looking at the implementation and tests, I notice that while it handles normal cases well (including different encodings), it might be worth adding explicit error handling for:

  • Empty or zero-sized inputs
  • Corrupted gzip data
  • Memory constraints with large files

Would you consider adding these error cases to the schema? wdyt?


2803-2808: ⚠️ Potential issue

Should we add the required type field to JsonLineParser?

I notice that JsonLineParser is missing the required type field in its schema, unlike the other parsers. This could lead to inconsistency in validation. Would you consider adding it? wdyt?

  JsonLineParser:
    type: object
+   required:
+     - type
    properties:
+     type:
+       type: string
+       enum: [ JsonLineParser ]
      encoding:
        type: string
        default: utf-8

Likely invalid or redundant comment.


2769-2772: Verify the impact on existing decoder implementations.

The addition of CompositeRawDecoder to SimpleRetriever's decoder options looks good. However, let's verify that this doesn't affect existing decoder implementations. I'll help check for any potential conflicts:

✅ Verification successful

The addition of CompositeRawDecoder looks safe and well-integrated

Based on the codebase analysis, the CompositeRawDecoder is properly integrated into the existing decoder hierarchy and doesn't conflict with other decoders. Here's what I found:

  • It's properly defined in the schema alongside other decoders like JsonDecoder, XmlDecoder, etc.
  • The implementation follows the same pattern as other decoders
  • The factory code (ModelToComponentFactory) has proper handling for CompositeRawDecoder
  • No existing decoders are modified or impacted by this addition

The decoder is properly isolated and follows the existing patterns, making it a safe addition to the SimpleRetriever's decoder options.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing decoder implementations that might be affected
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):" --type python

# Search for decoder usage in existing manifests
echo "Searching for decoder usage in manifests..."
rg -l "decoder:" --type yaml

Length of output: 361


Script:

#!/bin/bash
# Let's try searching without file type restrictions since python extension wasn't recognized
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):"

# Let's also look for specific decoder usage patterns
echo "Searching for decoder references in code..."
rg "Decoder" -A 3

# Check for any existing composite decoder patterns
echo "Searching for composite decoder patterns..."
rg "CompositeRawDecoder|composite.*decoder" -i

Length of output: 77271

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

476-476: LGTM!

The mapping of CompositeRawDecoderModel to its factory method is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants